package kafka

import (
	"context"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/ctx"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/log"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/servers"
)

var addrs []string
var conf *Config
var kafkaConf *KaConfig

func ServerInit(addr []string, config *Config, kafkaConfig *KaConfig) {
	addrs = addr
	conf = config
	kafkaConf = kafkaConfig
}

type ServerGroup struct {
	servers []servers.Server
}

func (s *ServerGroup) Register(server servers.Server) {
	s.servers = append(s.servers, server)
}

func (s *ServerGroup) Servers() []servers.Server {
	return s.servers
}

type ConsumerGroupServer struct {
	groupID        string
	handle         ConsumerFunc
	consumerGroups []ConsumerGroup
	topics         []string
	consumerNum    int
}

// ListenAndServe 启动监听消费服务
func (s *ConsumerGroupServer) ListenAndServe() error {
	for i := 0; i < s.consumerNum; i++ {
		consumer, err := NewConsumerGroup(addrs, s.groupID, conf, kafkaConf)
		if err != nil {
			return err
		}
		err = consumer.Consume(ctx.New(), s.handle, s.topics)
		if err != nil {
			return err
		}
		s.consumerGroups = append(s.consumerGroups, consumer)
	}
	return nil
}

// Shutdown 关闭消费者
func (s *ConsumerGroupServer) Shutdown(c context.Context) (err error) {
	for _, consumer := range s.consumerGroups {
		if er := consumer.Close(); er != nil {
			err = er
			log.Warn(ctx.Wrap(c)).Msgf("close consumer err:%v", err)
		}
	}
	return
}

func (s *ConsumerGroupServer) Name() string {
	return s.groupID
}

// NewConsumerGroupServer 初始化消费组服务
func NewConsumerGroupServer(groupID string, handle ConsumerFunc, topics []string, consumerNum int) *ConsumerGroupServer {
	return &ConsumerGroupServer{
		groupID:     groupID,
		handle:      handle,
		topics:      topics,
		consumerNum: consumerNum,
	}
}
